package com.jnc.pay.core.vertx.verticle;

import cn.hutool.core.util.StrUtil;
import com.jnc.pay.biz.common.model.AlipayConfig;
import com.jnc.pay.biz.common.model.WechatpayConfig;
import com.jnc.pay.constant.BizConstant;
import com.jnc.pay.constant.RedisConstant;
import com.jnc.pay.core.config.redis.RedisStore;
import com.jnc.pay.core.exception.BusinessException;
import com.jnc.pay.core.exception.IdempotencyException;
import com.jnc.pay.core.exception.LockException;
import com.jnc.pay.core.model.BaseReq;
import com.jnc.pay.core.model.BaseResp;
import com.jnc.pay.core.model.RespCode;
import com.jnc.pay.util.HttpUtil;
import com.jnc.pay.util.convert.ConvertUtil;
import com.jnc.pay.util.convert.JsonUtil;
import com.jnc.pay.util.pay.WechatPayUtil;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.eventbus.DeliveryOptions;
import io.vertx.core.eventbus.Message;
import io.vertx.rx.java.ObservableFuture;
import io.vertx.rx.java.RxHelper;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.springframework.beans.factory.annotation.Autowired;

/**
 * @Author: jjn
 * @Date: 2019/2/1 18:22
 * @Desc: WorkerVerticle：这类Verticle会运行在 Worker Pool 中的线程上。
 * 一个实例绝对不会被多个线程同时执行
 */
@Slf4j
public class WorkerVerticle extends AbstractVerticle {

    @Autowired
    private DeliveryOptions deliveryOptions;
    @Autowired
    private RedisStore redisStore;
    @Autowired
    private WechatPayUtil wechatPayUtil;

    public DeliveryOptions getDeliveryOptions(){
        return deliveryOptions;
    }

    protected void rxSub(BaseReq req){
        ObservableFuture<Message<BaseResp>> observable = RxHelper.observableFuture();
        observable.subscribe(message->{
            BaseResp resp = message.body();
            log.debug("event address: {}, event req: {}, event resp: {}",
                    req.getPath(), JsonUtil.bean2Json(req), JsonUtil.bean2Json(resp));
            HttpUtil.instance.resp(req, resp);
        },error->{
            log.error("event handling exception, event address: {}, event req: {}, exception: {}"
                    , req.getPath(), JsonUtil.bean2Json(req), ExceptionUtils.getStackTrace(error));
            HttpUtil.instance.resp(req, BaseResp.fail("server error"));
        });

        log.debug("event address: {}, event req: {}", req.getPath(), JsonUtil.bean2Json(req));

        vertx.eventBus().request(req.getPath(), req, getDeliveryOptions(), observable.toHandler());
    }

    /**
     * 统一异常处理
     * @param e
     * @param event
     */
    protected void exceptionHandler(Throwable e, Message<Object> event){
        String msg = null;
        if(e instanceof LockException){
            msg = e.getMessage();
        }else if(e instanceof IdempotencyException){
            msg = e.getMessage();
        }else if(e instanceof BusinessException){
            msg = e.getMessage();
        }else{
            msg = "Internal server error";
        }
        event.reply(BaseResp.fail(msg), getDeliveryOptions());
    }

    /**
     * 获取分发路径
     * @param dictType
     * @param req
     * @return
     */
    protected BaseResp getPath(String dictType, BaseReq req){
        BaseResp resp = new BaseResp();
        resp.setCode(RespCode.SUCCESS.getCode()).setMsg(RespCode.SUCCESS.getDesc());
        String path = "";
        try{
            path = ConvertUtil.toStr(redisStore.getHashValues(RedisConstant.DICT_DATA + dictType, req.getPayType()));
        }catch (Exception e){
            log.error("get path error: {}", e);
            resp.setCode(RespCode.ERROR.getCode()).setMsg("Get path exception from redis");
            return resp;
        }

        if(StrUtil.isBlank(path)){
            String msg = "Invalid pay type: " + req.getPayType();
            log.warn("invalid pay type: {}", req.getPayType());
            resp.setCode(RespCode.ERROR.getCode()).setMsg(msg);
            return resp;
        }
        req.setPath(path);
        return resp;
    }

    /**
     * 获取appId，渠道配置信息
     * @param req
     * @return
     */
    protected BaseResp getConfig(BaseReq req){
        BaseResp resp = new BaseResp();
        resp.setCode(RespCode.SUCCESS.getCode()).setMsg(RespCode.SUCCESS.getDesc());
        switch (req.getChannelId()){
            case BizConstant.CHANNEL_ID_WX:
                WechatpayConfig wechatpayConfig = wechatPayUtil.getWechatpayConfig(req.getAppId());
                if(wechatpayConfig == null){
                    String msg = "该appId [" + req.getAppId() + "]，没有绑定有效的微信支付渠道配置信息";
                    log.warn("{}", msg);
                    resp.setCode(RespCode.ERROR.getCode()).setMsg(msg);
                    return resp;
                }
                req.setWechatpayConfig(wechatpayConfig);
                return resp;
            case BizConstant.CHANNEL_ID_ALI:
                AlipayConfig alipayConfig = wechatPayUtil.getAlipayConfig(req.getAppId());
                if(alipayConfig == null){
                    String msg = "该appId [" + req.getAppId() + "]，没有绑定有效的支付宝支付渠道配置信息";
                    log.warn("{}", msg);
                    resp.setCode(RespCode.ERROR.getCode()).setMsg(msg);
                    return resp;
                }
                req.setAlipayConfig(alipayConfig);
                return resp;
            default:
                log.warn("Invalid channelId: {}", req.getChannelId());
                resp.setCode(RespCode.ERROR.getCode()).setMsg("Invalid channelId: " + req.getChannelId());
                return resp;
        }
    }
}
